---
title: "Control Flow in Kastrax AI Workflows | Workflows | Kastrax Docs"
description: "Learn how to orchestrate complex AI agent operations with Kastrax's powerful workflow control flow features, including sequential execution, parallel processing, branching logic, and loops."
---

# Control Flow in Kastrax AI Workflows ✅

Kastrax's workflow system provides sophisticated control flow capabilities that allow you to orchestrate complex AI agent operations. Whether you need to execute steps sequentially, run operations in parallel, implement conditional branching, or create loops, Kastrax offers a type-safe, declarative API for defining exactly how your workflow should behave.

This guide explains the various control flow patterns available in Kastrax workflows and how to implement them effectively.

## Sequential Execution ✅

Sequential execution is the most basic control flow pattern, where steps are executed one after another in a defined order. This ensures that outputs from one step become inputs for the next step.

```kotlin filename="SequentialWorkflow.kt"
import ai.kastrax.core.workflow.workflow
import ai.kastrax.core.workflow.variable

// Create a workflow with sequential steps
val orderProcessingWorkflow = workflow {
    name = "order-processing"
    description = "Process customer orders sequentially"

    // First step: Fetch order data
    step(fetchOrderAgent) {
        id = "fetch-order"
        name = "Fetch Order Data"
        description = "Retrieve order information from the database"
        variables = mutableMapOf(
            "orderId" to variable("$.input.orderId")
        )
    }

    // Second step: Validate order data (runs after fetch-order)
    step(validationAgent) {
        id = "validate-order"
        name = "Validate Order"
        description = "Validate order data for completeness and correctness"
        after("fetch-order")  // Explicit dependency
        variables = mutableMapOf(
            "orderData" to variable("$.steps.fetch-order.output.data")
        )
    }

    // Third step: Process payment (runs after validate-order)
    step(paymentAgent) {
        id = "process-payment"
        name = "Process Payment"
        description = "Process payment for the validated order"
        after("validate-order")  // Explicit dependency
        variables = mutableMapOf(
            "orderData" to variable("$.steps.fetch-order.output.data"),
            "validationResult" to variable("$.steps.validate-order.output.result")
        )
    }
}
```

In this example, each step explicitly depends on the previous step using the `after()` function, creating a clear sequential flow.

## Parallel Execution ✅

Parallel execution allows multiple steps to run simultaneously when they don't depend on each other. This can significantly improve workflow performance for independent operations.

```kotlin filename="ParallelWorkflow.kt"
import ai.kastrax.core.workflow.workflow
import ai.kastrax.core.workflow.variable

// Create a workflow with parallel steps
val dataProcessingWorkflow = workflow {
    name = "data-processing"
    description = "Process data from multiple sources in parallel"

    // These steps will run in parallel since they don't have dependencies on each other
    step(userDataAgent) {
        id = "fetch-user-data"
        name = "Fetch User Data"
        description = "Retrieve user information"
        variables = mutableMapOf(
            "userId" to variable("$.input.userId")
        )
    }

    step(productDataAgent) {
        id = "fetch-product-data"
        name = "Fetch Product Data"
        description = "Retrieve product information"
        variables = mutableMapOf(
            "productId" to variable("$.input.productId")
        )
    }

    step(inventoryDataAgent) {
        id = "fetch-inventory-data"
        name = "Fetch Inventory Data"
        description = "Retrieve inventory information"
        variables = mutableMapOf(
            "productId" to variable("$.input.productId")
        )
    }

    // This step will only run after all parallel steps have completed
    step(analysisAgent) {
        id = "analyze-data"
        name = "Analyze Combined Data"
        description = "Analyze data from all sources"
        after("fetch-user-data", "fetch-product-data", "fetch-inventory-data")  // Multiple dependencies
        variables = mutableMapOf(
            "userData" to variable("$.steps.fetch-user-data.output.data"),
            "productData" to variable("$.steps.fetch-product-data.output.data"),
            "inventoryData" to variable("$.steps.fetch-inventory-data.output.data")
        )
    }
}
```

In this example, the first three steps run in parallel because they don't have dependencies on each other. The final analysis step only runs after all three parallel steps have completed.

## Branching and Conditional Paths ✅

Branching allows your workflow to take different paths based on conditions or results from previous steps. This is essential for implementing decision logic in your workflows.

```kotlin filename="BranchingWorkflow.kt"
import ai.kastrax.core.workflow.workflow
import ai.kastrax.core.workflow.variable

// Create a workflow with branching paths
val contentWorkflow = workflow {
    name = "content-workflow"
    description = "Create and process content with branching logic"

    // Initial content analysis step
    step(analysisAgent) {
        id = "analyze-content"
        name = "Content Analysis"
        description = "Analyze content quality and type"
        variables = mutableMapOf(
            "content" to variable("$.input.content")
        )
    }

    // Conditional branching based on content quality
    conditionalStep {
        id = "quality-branch"
        name = "Quality Branching"
        description = "Branch based on content quality"
        after("analyze-content")

        // Define the condition to evaluate
        condition { context ->
            val quality = context.getVariable("$.steps.analyze-content.output.qualityScore") as? Double ?: 0.0
            quality >= 8.0  // High quality if score is 8.0 or higher
        }

        // Steps to execute if condition is true (high quality)
        onTrue {
            step(publishAgent) {
                id = "publish-content"
                name = "Publish Content"
                description = "Publish the high-quality content"
                variables = mutableMapOf(
                    "content" to variable("$.input.content")
                )
            }
        }

        // Steps to execute if condition is false (low quality)
        onFalse {
            step(revisionAgent) {
                id = "revise-content"
                name = "Revise Content"
                description = "Improve the low-quality content"
                variables = mutableMapOf(
                    "content" to variable("$.input.content"),
                    "feedback" to variable("$.steps.analyze-content.output.feedback")
                )

                // Continue to publishing after revision
                onComplete {
                    step(publishAgent) {
                        id = "publish-revised-content"
                        name = "Publish Revised Content"
                        description = "Publish the revised content"
                        variables = mutableMapOf(
                            "content" to variable("$.steps.revise-content.output.revisedContent")
                        )
                    }
                }
            }
        }
    }
}
```

In this example, the workflow branches based on the content quality score. High-quality content goes directly to publishing, while low-quality content goes through a revision step before publishing.

## Merging Execution Paths ✅

Merging allows multiple execution paths to converge at a specific step. This is useful when you need to synchronize different branches of your workflow.

```kotlin filename="MergingWorkflow.kt"
import ai.kastrax.core.workflow.workflow
import ai.kastrax.core.workflow.variable

// Create a workflow with merging paths
val researchWorkflow = workflow {
    name = "research-workflow"
    description = "Research a topic from multiple sources and synthesize results"

    // These steps run in parallel
    step(academicResearchAgent) {
        id = "academic-research"
        name = "Academic Research"
        description = "Research from academic sources"
        variables = mutableMapOf(
            "topic" to variable("$.input.topic"),
            "depth" to variable("$.input.academicDepth")
        )
    }

    step(newsResearchAgent) {
        id = "news-research"
        name = "News Research"
        description = "Research from news sources"
        variables = mutableMapOf(
            "topic" to variable("$.input.topic"),
            "timeframe" to variable("$.input.newsTimeframe")
        )
    }

    step(socialMediaResearchAgent) {
        id = "social-research"
        name = "Social Media Research"
        description = "Research from social media"
        variables = mutableMapOf(
            "topic" to variable("$.input.topic"),
            "platforms" to variable("$.input.socialPlatforms")
        )
    }

    // This step merges all research paths
    step(synthesisAgent) {
        id = "research-synthesis"
        name = "Research Synthesis"
        description = "Synthesize research from all sources"
        // This step depends on all three research steps
        after("academic-research", "news-research", "social-research")
        variables = mutableMapOf(
            "academicData" to variable("$.steps.academic-research.output.findings"),
            "newsData" to variable("$.steps.news-research.output.findings"),
            "socialData" to variable("$.steps.social-research.output.findings")
        )
    }

    // Final report generation
    step(reportAgent) {
        id = "generate-report"
        name = "Generate Report"
        description = "Create a comprehensive report"
        after("research-synthesis")
        variables = mutableMapOf(
            "synthesizedData" to variable("$.steps.research-synthesis.output.synthesizedFindings"),
            "format" to variable("$.input.reportFormat")
        )
    }
}
```

In this example, three parallel research paths merge at the synthesis step, which only executes after all three research steps have completed. This ensures that all research data is available before synthesis begins.

## Loops and Iterative Processing ✅

Kastrax supports iterative processing through loop constructs that allow steps to repeat until certain conditions are met. This is essential for tasks that require multiple iterations or refinement cycles.

### Using Loop Steps

```kotlin filename="LoopWorkflow.kt"
import ai.kastrax.core.workflow.workflow
import ai.kastrax.core.workflow.variable

// Create a workflow with a loop
val contentRefinementWorkflow = workflow {
    name = "content-refinement"
    description = "Iteratively refine content until it meets quality standards"

    // Initial content generation
    step(contentGenerationAgent) {
        id = "generate-content"
        name = "Generate Initial Content"
        description = "Create the first draft of content"
        variables = mutableMapOf(
            "topic" to variable("$.input.topic"),
            "style" to variable("$.input.style")
        )
    }

    // Initialize iteration counter
    step(initializationAgent) {
        id = "initialize-loop"
        name = "Initialize Loop Variables"
        description = "Set up variables for the refinement loop"
        after("generate-content")
        execute { context ->
            mapOf(
                "currentContent" to context.getVariable("$.steps.generate-content.output.content"),
                "iterationCount" to 0,
                "qualityScore" to 0.0
            )
        }
    }

    // Refinement loop
    loopStep {
        id = "refinement-loop"
        name = "Content Refinement Loop"
        description = "Iteratively improve content until quality threshold is reached"
        after("initialize-loop")

        // Continue looping while quality is below threshold and iterations are under limit
        condition { context ->
            val qualityScore = context.getVariable("$.steps.evaluate-content.output.qualityScore") as? Double ?: 0.0
            val iterationCount = context.getVariable("$.steps.refine-content.output.iterationCount") as? Int ?: 0
            qualityScore < 8.5 && iterationCount < 5  // Continue if quality < 8.5 and fewer than 5 iterations
        }

        // Loop body
        body {
            // Evaluate current content
            step(evaluationAgent) {
                id = "evaluate-content"
                name = "Evaluate Content Quality"
                description = "Assess the quality of the current content"
                variables = mutableMapOf(
                    "content" to variable("$.steps.refine-content.output.currentContent",
                                         defaultValue = variable("$.steps.initialize-loop.output.currentContent"))
                )
            }

            // Refine content based on evaluation
            step(refinementAgent) {
                id = "refine-content"
                name = "Refine Content"
                description = "Improve content based on evaluation feedback"
                after("evaluate-content")
                variables = mutableMapOf(
                    "content" to variable("$.steps.refine-content.output.currentContent",
                                         defaultValue = variable("$.steps.initialize-loop.output.currentContent")),
                    "feedback" to variable("$.steps.evaluate-content.output.feedback"),
                    "qualityScore" to variable("$.steps.evaluate-content.output.qualityScore"),
                    "iterationCount" to variable("$.steps.refine-content.output.iterationCount", defaultValue = 0)
                )
                execute { context ->
                    val content = context.getVariable("content") as String
                    val feedback = context.getVariable("feedback") as String
                    val iterationCount = (context.getVariable("iterationCount") as Int) + 1
                    val qualityScore = context.getVariable("qualityScore") as Double

                    // In a real implementation, this would use an agent to refine the content
                    // For this example, we're just simulating refinement
                    val refinedContent = "$content\n\nRefined in iteration $iterationCount based on: $feedback"

                    mapOf(
                        "currentContent" to refinedContent,
                        "iterationCount" to iterationCount,
                        "qualityScore" to qualityScore
                    )
                }
            }
        }
    }

    // Final processing after loop completion
    step(finalizationAgent) {
        id = "finalize-content"
        name = "Finalize Content"
        description = "Prepare the refined content for publication"
        after("refinement-loop")
        variables = mutableMapOf(
            "content" to variable("$.steps.refine-content.output.currentContent"),
            "iterationCount" to variable("$.steps.refine-content.output.iterationCount"),
            "finalQuality" to variable("$.steps.evaluate-content.output.qualityScore")
        )
    }
}
```

In this example, the workflow uses a loop to iteratively refine content until it reaches a quality threshold or hits a maximum number of iterations. The loop body contains steps for evaluating and refining the content, with each iteration building on the results of the previous one.

### Using Recursive Patterns

For more complex iterative processes, Kastrax also supports recursive patterns where steps can trigger themselves under certain conditions:

```kotlin filename="RecursiveWorkflow.kt"
import ai.kastrax.core.workflow.workflow
import ai.kastrax.core.workflow.variable

// Create a workflow with recursive processing
val recursiveWorkflow = workflow {
    name = "recursive-processing"
    description = "Process data recursively until completion"

    // Initial data processing
    step(processingAgent) {
        id = "process-data"
        name = "Process Data"
        description = "Process a batch of data"
        variables = mutableMapOf(
            "data" to variable("$.input.data"),
            "batchNumber" to variable("$.input.batchNumber", defaultValue = 1),
            "totalBatches" to variable("$.input.totalBatches")
        )

        // Define what happens after this step completes
        onComplete { result ->
            val batchNumber = result["batchNumber"] as Int
            val totalBatches = result["totalBatches"] as Int

            // If there are more batches to process, trigger the next batch
            if (batchNumber < totalBatches) {
                step(processingAgent) {
                    id = "process-data-${batchNumber + 1}"
                    name = "Process Data Batch ${batchNumber + 1}"
                    description = "Process the next batch of data"
                    variables = mutableMapOf(
                        "data" to variable("$.input.nextBatchData"),
                        "batchNumber" to (batchNumber + 1),
                        "totalBatches" to totalBatches
                    )
                }
            } else {
                // If all batches are processed, move to finalization
                step(finalizationAgent) {
                    id = "finalize-processing"
                    name = "Finalize Processing"
                    description = "Combine and finalize all processed batches"
                    variables = mutableMapOf(
                        "processedBatches" to variable("$.steps", transform = { steps ->
                            // Collect results from all processing steps
                            (steps as Map<String, Any>).filter { it.key.startsWith("process-data") }
                                .map { it.value }
                        })
                    )
                }
            }
        }
    }
}
```

This recursive pattern allows for dynamic workflow generation based on the results of previous steps, enabling complex iterative processes that can't be fully defined in advance.

## Conditional Execution ✅

Kastrax provides several ways to conditionally execute steps based on the results of previous steps or external conditions. This allows for dynamic, adaptive workflows that can respond to changing circumstances.

### Using Conditional Steps

The most direct way to implement conditional logic is using conditional steps:

```kotlin filename="ConditionalExecution.kt"
import ai.kastrax.core.workflow.workflow
import ai.kastrax.core.workflow.variable

// Create a workflow with conditional execution
val approvalWorkflow = workflow {
    name = "approval-workflow"
    description = "Process content with approval conditions"

    // Initial content creation
    step(contentAgent) {
        id = "create-content"
        name = "Create Content"
        description = "Generate initial content"
        variables = mutableMapOf(
            "topic" to variable("$.input.topic")
        )
    }

    // Conditional step for content approval
    conditionalStep {
        id = "approval-check"
        name = "Approval Check"
        description = "Check if content requires approval"
        after("create-content")

        // Define the condition to evaluate
        condition { context ->
            val contentLength = context.getVariable("$.steps.create-content.output.content")?.toString()?.length ?: 0
            val sensitiveTopics = context.getVariable("$.steps.create-content.output.sensitiveTopicsDetected") as? Boolean ?: false

            // Content requires approval if it's long or contains sensitive topics
            contentLength > 1000 || sensitiveTopics
        }

        // Steps to execute if approval is required
        onTrue {
            step(approvalAgent) {
                id = "request-approval"
                name = "Request Approval"
                description = "Send content for human approval"
                variables = mutableMapOf(
                    "content" to variable("$.steps.create-content.output.content"),
                    "approver" to variable("$.input.approverEmail")
                )
            }
        }

        // Steps to execute if approval is not required
        onFalse {
            step(publishAgent) {
                id = "auto-publish"
                name = "Auto-Publish"
                description = "Automatically publish content without approval"
                variables = mutableMapOf(
                    "content" to variable("$.steps.create-content.output.content")
                )
            }
        }
    }
}
```

### Using Step Conditions

You can also add conditions directly to individual steps:

```kotlin
step(publishAgent) {
    id = "publish-content"
    name = "Publish Content"
    description = "Publish content if approved"
    after("request-approval")

    // Only execute this step if the approval was granted
    condition { context ->
        context.getVariable("$.steps.request-approval.output.approved") as? Boolean ?: false
    }

    variables = mutableMapOf(
        "content" to variable("$.steps.create-content.output.content"),
        "channel" to variable("$.input.publishChannel")
    )
}
```

### Using Dynamic Step Generation

For more complex conditional logic, you can dynamically generate steps based on runtime conditions:

```kotlin
step(analysisAgent) {
    id = "analyze-data"
    name = "Analyze Data"
    description = "Analyze input data and determine next steps"
    variables = mutableMapOf(
        "data" to variable("$.input.data")
    )

    // Dynamically determine next steps based on analysis results
    onComplete { result ->
        val dataType = result["dataType"] as? String

        when (dataType) {
            "text" -> {
                step(textProcessingAgent) {
                    id = "process-text"
                    name = "Process Text Data"
                    description = "Process textual data"
                    variables = mutableMapOf(
                        "text" to variable("$.steps.analyze-data.output.extractedText")
                    )
                }
            }
            "image" -> {
                step(imageProcessingAgent) {
                    id = "process-image"
                    name = "Process Image Data"
                    description = "Process image data"
                    variables = mutableMapOf(
                        "imageUrl" to variable("$.steps.analyze-data.output.imageUrl")
                    )
                }
            }
            else -> {
                step(fallbackAgent) {
                    id = "process-unknown"
                    name = "Process Unknown Data"
                    description = "Handle unknown data type"
                    variables = mutableMapOf(
                        "rawData" to variable("$.steps.analyze-data.output.rawData")
                    )
                }
            }
        }
    }
}
```

## Best Practices for Control Flow ✅

When designing workflow control flow, consider these best practices:

### 1. Keep Workflows Readable

Design your workflows to be as readable and maintainable as possible:

```kotlin
// Good: Clear, descriptive step IDs and logical flow
workflow {
    step(dataFetchAgent) { id = "fetch-data" }
    step(validationAgent) { id = "validate-data"; after("fetch-data") }
    step(processingAgent) { id = "process-data"; after("validate-data") }
}

// Avoid: Confusing dependencies and unclear flow
// workflow {
//     step(processingAgent) { id = "process" }
//     step(dataFetchAgent) { id = "fetch" }
//     step(validationAgent) { id = "validate"; after("fetch") }
//     step(finalAgent) { id = "final"; after("process", "validate") }
// }
```

### 2. Use Explicit Dependencies

Always make dependencies between steps explicit:

```kotlin
// Good: Explicit dependencies
step(analysisAgent) {
    id = "analyze-data"
    after("fetch-data", "preprocess-data")  // Explicit dependencies
}

// Avoid: Implicit dependencies through variable references without explicit step dependencies
// step(analysisAgent) {
//     id = "analyze-data"
//     variables = mutableMapOf(
//         "data" to variable("$.steps.fetch-data.output.data")  // Implicit dependency
//     )
// }
```

### 3. Handle Error Cases

Implement proper error handling in your control flow:

```kotlin
conditionalStep {
    id = "data-validation"

    condition { context ->
        val validationErrors = context.getVariable("$.steps.validate-data.output.errors") as? List<String> ?: emptyList()
        validationErrors.isEmpty()  // Check if there are no validation errors
    }

    onTrue {
        step(processingAgent) { id = "process-data" }
    }

    onFalse {
        step(errorHandlingAgent) {
            id = "handle-validation-errors"
            variables = mutableMapOf(
                "errors" to variable("$.steps.validate-data.output.errors")
            )
        }
    }
}
```

### 4. Avoid Overly Complex Workflows

Break down complex workflows into smaller, more manageable sub-workflows:

```kotlin
// Main workflow that orchestrates sub-workflows
val mainWorkflow = workflow {
    name = "main-process"

    // Execute data preparation sub-workflow
    subWorkflowStep {
        id = "data-preparation"
        workflowId = "data-prep-workflow"
        input = mapOf(
            "rawData" to variable("$.input.data")
        )
    }

    // Execute analysis sub-workflow after data preparation
    subWorkflowStep {
        id = "data-analysis"
        after("data-preparation")
        workflowId = "analysis-workflow"
        input = mapOf(
            "preparedData" to variable("$.steps.data-preparation.output.processedData")
        )
    }
}
```

### 5. Test Complex Control Flows

Thoroughly test workflows with complex control flows to ensure they behave as expected:

```kotlin
// Test workflow with different input conditions
fun testWorkflow() = runBlocking {
    val workflowEngine = kastraxSystem.workflowEngine

    // Test with valid data
    val validResult = workflowEngine.executeWorkflow(
        workflowId = "data-processing",
        input = mapOf("data" to validTestData)
    )
    assert(validResult.success)

    // Test with invalid data to verify error handling
    val invalidResult = workflowEngine.executeWorkflow(
        workflowId = "data-processing",
        input = mapOf("data" to invalidTestData)
    )
    assert(invalidResult.steps["handle-validation-errors"] != null)
}
```

By following these best practices, you can create workflows with clear, maintainable control flow that effectively orchestrates your AI agents and tools.

## Data Access Patterns ✅

Kastrax provides several ways to pass data between steps:

1. **Context Object** - Access step results directly through the context object
2. **Variable Mapping** - Explicitly map outputs from one step to inputs of another
3. **getStepResult Method** - Type-safe method to retrieve step outputs

Each approach has its advantages depending on your use case and requirements for type safety.

### Using getStepResult Method

The `getStepResult` method provides a type-safe way to access step results. This is the recommended approach when working with TypeScript as it preserves type information.

#### Basic Usage

For better type safety, you can provide a type parameter to `getStepResult`:

```typescript showLineNumbers filename="src/kastrax/workflows/get-step-result.ts" copy
import { Step, Workflow } from "@kastrax/core/workflows";
import { z } from "zod";

const fetchUserStep = new Step({
  id: 'fetchUser',
  outputSchema: z.object({
    name: z.string(),
    userId: z.string(),
  }),
  execute: async ({ context }) => {
    return { name: 'John Doe', userId: '123' };
  },
});

const analyzeDataStep = new Step({
  id: "analyzeData",
  execute: async ({ context }) => {
    // Type-safe access to previous step result
    const userData = context.getStepResult<{ name: string, userId: string }>("fetchUser");

    if (!userData) {
      return { status: "error", message: "User data not found" };
    }

    return {
      analysis: `Analyzed data for user ${userData.name}`,
      userId: userData.userId
    };
  },
});
```


#### Using Step References

The most type-safe approach is to reference the step directly in the `getStepResult` call:

```typescript showLineNumbers filename="src/kastrax/workflows/step-reference.ts" copy
import { Step, Workflow } from "@kastrax/core/workflows";
import { z } from "zod";

// Define step with output schema
const fetchUserStep = new Step({
  id: "fetchUser",
  outputSchema: z.object({
    userId: z.string(),
    name: z.string(),
    email: z.string(),
  }),
  execute: async () => {
    return {
      userId: "user123",
      name: "John Doe",
      email: "john@example.com"
    };
  },
});

const processUserStep = new Step({
  id: "processUser",
  execute: async ({ context }) => {
    // TypeScript will infer the correct type from fetchUserStep's outputSchema
    const userData = context.getStepResult(fetchUserStep);

    return {
      processed: true,
      userName: userData?.name
    };
  },
});

const workflow = new Workflow({
  name: "user-workflow",
});

workflow
  .step(fetchUserStep)
  .then(processUserStep)
  .commit();
```





### Using Variable Mapping

Variable mapping is an explicit way to define data flow between steps.
This approach makes dependencies clear and provides good type safety.
The data injected into the step is available in the `context.inputData` object, and typed based on the `inputSchema` of the step.

```typescript showLineNumbers filename="src/kastrax/workflows/variable-mapping.ts" copy
import { Step, Workflow } from "@kastrax/core/workflows";
import { z } from "zod";

const fetchUserStep = new Step({
  id: "fetchUser",
  outputSchema: z.object({
    userId: z.string(),
    name: z.string(),
    email: z.string(),
  }),
  execute: async () => {
    return {
      userId: "user123",
      name: "John Doe",
      email: "john@example.com"
    };
  },
});

const sendEmailStep = new Step({
  id: "sendEmail",
  inputSchema: z.object({
    recipientEmail: z.string(),
    recipientName: z.string(),
  }),
  execute: async ({ context }) => {
    const { recipientEmail, recipientName } = context.inputData;

    // Send email logic here
    return {
      status: "sent",
      to: recipientEmail
    };
  },
});

const workflow = new Workflow({
  name: "email-workflow",
});

workflow
  .step(fetchUserStep)
  .then(sendEmailStep, {
    variables: {
      // Map specific fields from fetchUser to sendEmail inputs
      recipientEmail: { step: fetchUserStep, path: 'email' },
      recipientName: { step: fetchUserStep, path: 'name' }
    }
  })
  .commit();
```

For more details on variable mapping, see the [Data Mapping with Workflow Variables](./variables.mdx) documentation.

### Using the Context Object

The context object provides direct access to all step results and their outputs. This approach is more flexible but requires careful handling to maintain type safety.
You can access step results directly through the `context.steps` object:

```typescript showLineNumbers filename="src/kastrax/workflows/context-access.ts" copy
import { Step, Workflow } from "@kastrax/core/workflows";
import { z } from "zod";

const processOrderStep = new Step({
  id: 'processOrder',
  execute: async ({ context }) => {
    // Access data from a previous step
    let userData: { name: string, userId: string };
    if (context.steps['fetchUser']?.status === 'success') {
      userData = context.steps.fetchUser.output;
    } else {
      throw new Error('User data not found');
    }

    return {
      orderId: 'order123',
      userId: userData.userId,
      status: 'processing',
    };
  },
});

const workflow = new Workflow({
  name: "order-workflow",
});

workflow
  .step(fetchUserStep)
  .then(processOrderStep)
  .commit();
```

### Workflow-Level Type Safety

For comprehensive type safety across your entire workflow, you can define types for all steps and pass them to the Workflow
This allows you to get type safety for the context object on conditions, and on step results in the final workflow output.

```typescript showLineNumbers filename="src/kastrax/workflows/workflow-typing.ts" copy
import { Step, Workflow } from "@kastrax/core/workflows";
import { z } from "zod";


// Create steps with typed outputs
const fetchUserStep = new Step({
  id: "fetchUser",
  outputSchema: z.object({
    userId: z.string(),
    name: z.string(),
    email: z.string(),
  }),
  execute: async () => {
    return {
      userId: "user123",
      name: "John Doe",
      email: "john@example.com"
    };
  },
});

const processOrderStep = new Step({
  id: "processOrder",
  execute: async ({ context }) => {
    // TypeScript knows the shape of userData
    const userData = context.getStepResult(fetchUserStep);

    return {
      orderId: "order123",
      status: "processing"
    };
  },
});

const workflow = new Workflow<[typeof fetchUserStep, typeof processOrderStep]>({
  name: "typed-workflow",
});

workflow
  .step(fetchUserStep)
  .then(processOrderStep)
  .until(async ({ context }) => {
    // TypeScript knows the shape of userData here
    const res = context.getStepResult('fetchUser');
    return res?.userId === '123';
  }, processOrderStep)
  .commit();
```

### Accessing Trigger Data

In addition to step results, you can access the original trigger data that started the workflow:

```typescript showLineNumbers filename="src/kastrax/workflows/trigger-data.ts" copy
import { Step, Workflow } from "@kastrax/core/workflows";
import { z } from "zod";

// Define trigger schema
const triggerSchema = z.object({
  customerId: z.string(),
  orderItems: z.array(z.string()),
});

type TriggerType = z.infer<typeof triggerSchema>;

const processOrderStep = new Step({
  id: "processOrder",
  execute: async ({ context }) => {
    // Access trigger data with type safety
    const triggerData = context.getStepResult<TriggerType>('trigger');

    return {
      customerId: triggerData?.customerId,
      itemCount: triggerData?.orderItems.length || 0,
      status: "processing"
    };
  },
});

const workflow = new Workflow({
  name: "order-workflow",
  triggerSchema,
});

workflow
  .step(processOrderStep)
  .commit();
```

### Accessing Resume Data

The data injected into the step is available in the `context.inputData` object, and typed based on the `inputSchema` of the step.

```typescript showLineNumbers filename="src/kastrax/workflows/resume-data.ts" copy
import { Step, Workflow } from "@kastrax/core/workflows";
import { z } from "zod";

const processOrderStep = new Step({
  id: "processOrder",
  inputSchema: z.object({
    orderId: z.string(),
  }),
  execute: async ({ context, suspend }) => {
    const { orderId } = context.inputData;

    if (!orderId) {
      await suspend();
      return;
    }

    return {
      orderId,
      status: "processed"
    };
  },
});

const workflow = new Workflow({
  name: "order-workflow",
});

workflow
  .step(processOrderStep)
  .commit();

const run = workflow.createRun();
const result = await run.start();

const resumedResult = await workflow.resume({
  runId: result.runId,
  stepId: 'processOrder',
  inputData: {
    orderId: '123',
  },
});

console.log({resumedResult});
```

### Accessing Workflow Results

You can get typed access to the results of a workflow by injecting the step types into the `Workflow` type params:

```typescript showLineNumbers filename="src/kastrax/workflows/get-results.ts" copy
import { Workflow } from "@kastrax/core/workflows";

const fetchUserStep = new Step({
  id: "fetchUser",
  outputSchema: z.object({
    userId: z.string(),
    name: z.string(),
    email: z.string(),
  }),
  execute: async () => {
    return {
      userId: "user123",
      name: "John Doe",
      email: "john@example.com"
    };
  },
});

const processOrderStep = new Step({
  id: "processOrder",
  outputSchema: z.object({
    orderId: z.string(),
    status: z.string(),
  }),
  execute: async ({ context }) => {
    const userData = context.getStepResult(fetchUserStep);
    return {
      orderId: "order123",
      status: "processing"
    };
  },
});

const workflow = new Workflow<[typeof fetchUserStep, typeof processOrderStep]>({
  name: "typed-workflow",
});

workflow
  .step(fetchUserStep)
  .then(processOrderStep)
  .commit();

const run = workflow.createRun();
const result = await run.start();

// The result is a discriminated union of the step results
// So it needs to be narrowed down via status checks
if (result.results.processOrder.status === 'success') {
  // TypeScript will know the shape of the results
  const orderId = result.results.processOrder.output.orderId;
  console.log({orderId});
}

if (result.results.fetchUser.status === 'success') {
  const userId = result.results.fetchUser.output.userId;
  console.log({userId});
}
```

### Best Practices for Data Flow

1. **Use getStepResult with Step References for Type Safety**
   - Ensures TypeScript can infer the correct types
   - Catches type errors at compile time

2. **Use Variable Mapping for Explicit Dependencies*
   - Makes data flow clear and maintainable
   - Provides good documentation of step dependencies

3. **Define Output Schemas for Steps**
   - Validates data at runtime
	 - Validates return type of the `execute` function
   - Improves type inference in TypeScript

4. **Handle Missing Data Gracefully**
   - Always check if step results exist before accessing properties
   - Provide fallback values for optional data

5. **Keep Data Transformations Simple**
   - Transform data in dedicated steps rather than in variable mappings
   - Makes workflows easier to test and debug

### Comparison of Data Flow Methods

| Method | Type Safety | Explicitness | Use Case |
|--------|------------|--------------|----------|
| getStepResult | Highest | High | Complex workflows with strict typing requirements |
| Variable Mapping | High | High | When dependencies need to be clear and explicit |
| context.steps | Medium | Low | Quick access to step data in simple workflows |

By choosing the right data flow method for your use case, you can create workflows that are both type-safe and maintainable.

